Greengrassコネクタを使って、Kinesis Data FirehoseからS3に保存してみた
はじめに
CX事業本部の佐藤です。 Greengrassではコネクタという機能があり、コネクタを使うことで、GreengrassデバイスからAWSサービスや他のクラウドサービスとの連携が容易になります。今回は、Kinesis Firehoseコネクタを使用してGreengrassデバイスからKinesis Firehoseへデータを送信し、S3に保存してみたいと思います。
Kinesis Data Firehoseの作成
Kinesis Data Firehoseの配信ストリームを作成します
- マネージメントコンソールからKinesisを選択します
- Kinesisのダッシュボードから配信ストリームの作成を選択します
- Delivery stream nameに適当な名前を入れます
- SourceはDirect PUT or other sourcesを選択します
- Nextをクリックします
- Process Recordsの設定になりますが、今回はレコードの変換などは行わないため、そのままNextをクリックします
- DestinationにAmazon S3を選択します
- S3 bucketにKinesis Firehoseから送信する対象のS3バケット名を選択します
- Nextをクリックします
- IAM roleのCreate new or chooseをクリックし、別タブに遷移した後、許可をクリックします
- 設定内容を確認し、Nextをクリックします
- 配信ストリームが作られ、StatusがActiveになればOKです
Greengrassグループの設定
次に、Greengrassグループを設定します。Greengrassグループの作成とデバイスへの証明書の設定などは割愛します。グループには適切なIAMロールを設定しておいてください。
Greengrass Lambdaの設定
- 対象のGreengrassグループを選択します
- Lambdaを選択し、Lambdaの追加を選択します
- 新しいLambdaの作成をクリックし、Lambdaの新規作成の画面に遷移します
- 関数名に適当な名前を入れて、ランタイムにPython3.7を選択し、関数の作成をクリックします
- Lambdaのソースコードに以下の内容を貼り付け、保存をクリックします。今回は、単純にKinesis Firehoseコネクタに
Message from Firehose Connector Test
という文字列をPublishしているだけです。import greengrasssdk import time import json iot_client = greengrasssdk.client('iot-data') send_topic = 'kinesisfirehose/message' def create_request_with_all_fields(): return { "request": { "data": "Message from Firehose Connector Test" }, "id" : 'req_123' } def publish_basic_message(): messageToPublish = create_request_with_all_fields() print(messageToPublish) iot_client.publish(topic=send_topic, payload=json.dumps(messageToPublish)) def lambda_handler(event, context): publish_basic_message() return
- GreengrassにLambdaをデプロイするためには、エイリアスの発行を行わなければならないため、エイリアスの作成を行います。アクションをクリックし新しいバージョンを発行をクリックし、発行をクリックします。その後、エイリアスの作成をクリックし、名前に適当な名前を入れて、バージョンに先ほど発行したバージョン番号を選択します。
- Greengrassグループの画面に戻り、既存のLambdaの使用を選択します
- 先ほど作ったLambdaを選択し、次へをクリックします
- 先ほど発行したエイリアスを選択し、完了をクリックします
- GreengrassグループにLambdaが作成されます
Kinesis Firehoseコネクタの作成
次にKinesis Firehoseコネクタを作成します。
- 対象のGreengrassグループを選択し、コネクタを選択します
- コネクタの追加をクリックします
- コネクタの一覧から、Kinesis Firehoseを選択します
- Default delivery stream ARNに先ほど作成したKinesis Data FirehoseのArnを設定します
- 他の項目については、表示されているデフォルトの設定を入力します
- 追加をクリックします
これでコネクタの追加ができました。
サブスクリプションの作成
次にサブスクリプションの設定をします。今回は、test
というトピックがパブリッシュされたらGreengrass Lambdaが起動して、Greengrassデバイスからkinesisfirehose/message
トピックに再度パブリッシュします。Kinesis Firehoseコネクタはkinesisfirehose/message
トピックにパブリッシュされたペイロードを受け取り、バッファしたのち、S3に保存するという流れです。
以下のような、サブスクリプションを作成します
ソース | ターゲット | トピック |
---|---|---|
Kinesis Firehose | IoT Cloud | kinesisfirehose/message/status |
Greengrass Lambda | Kinesis Firehose | kinesisfirehose/message |
IoT Cloud | Greengrass Lambda | test |
- 対象のGreengrassグループを選択し、サブスクリプション選択します
- サブスクリプションの追加をクリックします
- ソースの選択とターゲットの選択に対象のリソースを選択し、次へをクリックします
- トピックフィルターに対応するトピックを入力し、次へをクリックします
- 確認し、完了をクリックします
- 以下のように設定されていることを確認します
デプロイ
必要な設定が終わったので、デバイスにデプロイします。
- 対象のGreengrassグループを選択します
- アクションを選択し、デプロイをクリックします
- 左上のステータスが正常に完了しましたになれば、デプロイOKです。もし失敗した場合は、証明書の設定などに不備がある可能性があります。
動作確認
デバイスにデプロイできたので、実際に動作確認してみます。
- マネージメントコンソールからIoT Coreを選択します
- メニューからテストを選択します
- トピックへサブスクライブするを選択し、
kinesisfirehose/message/status
を入力し、トピックへのサブスクライブをクリックします - トピックへの発行を選択し、
test
を入力しトピックに発行をクリックします - ちょっとして、サブスクライブしているトピックに以下のようなペイロードが流れてきたら成功です
{ "response": [ { "firehose_record_id": "hogehoge", "id": "req_123", "status": "success" } ] }
- Kinesis Data Firehoseのの設定で5分間バッファする設定になっているため、5分後に対象のS3バケットを確認し、オブジェクトが作成されていれば成功です
まとめ
Greengrassのコネクタを使ってみました。他にもRasberry PIのGPIOピンを制御するコネクタなど、面白そうなものがあるので、またブログにしてみたいと思います。